package me.seu.demo.service.netty.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import me.seu.demo.service.netty.NettyTcpServer;
import me.seu.demo.service.netty.connection.ConnectionManager;
import me.seu.demo.service.netty.constants.ProtocolConstants;
import me.seu.demo.service.netty.constants.SysConstants;
import me.seu.demo.service.netty.enums.EventEnum;
import me.seu.demo.service.netty.message.GpsMessage;
import me.seu.demo.service.netty.utils.ByteUtils;
import me.seu.demo.service.netty.utils.CommUtils;
import me.seu.demo.service.netty.utils.NetUtils;
import org.apache.commons.lang3.StringUtils;

import java.net.InetSocketAddress;
import java.net.SocketAddress;


/**
 * TCP业务处理handler
 * Sharable 注解 标注一个channel handler可以被多个channel安全地共享。
 * 为了安全地被用于多个并发的Channel（即连接），这样的ChannelHandler必须是线程安全的。
 * <p>
 * Netty中处理输入输出字节的最重要的类就是ChannelInboundHandler（处理输入字节）、ChannelOutboundHandler（处理输出字节）;
 *
 * </p>
 *
 * @author liangfeihu
 * @number 53669
 * @since 2021/3/26 下午2:44
 */
@Slf4j
@ChannelHandler.Sharable
public class TcpServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * Invoked if data are read from the Channel.
     * 处理进来的数据用来响应释放资源。
     * Netty 在 ByteBuf 上使用了资源池，所以当执行释放资源时可以减少内存的消耗。
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("[TcpServerHandler] channelRead come in one message");

        try {
            if (msg instanceof GpsMessage) {
                GpsMessage gpsMsg = (GpsMessage) msg;
                log.info("[TcpServerHandler] channelRead one 0x{} message", ByteUtils.byteToHexString(gpsMsg.getProtocolNo()));
                if (ProtocolConstants.PROTOCOL_LOGIN == gpsMsg.getProtocolNo()) {
                    // 回应登录验证消息
                    Process process = ProcessManager.getInstance().getProcess(EventEnum.LOGIN);
                    process.execute(gpsMsg);
                } else {
                    String deviceId = CommUtils.getDeviceIdFromAttr(ctx);
                    if (StringUtils.isBlank(deviceId)) {
                        // 消息类型不是登录连接验证，而且deviceId参数为空 表示未验证
                        log.error("设备{}连接未验证", deviceId);
                        ctx.pipeline().close();
                        return;
                    }
                    // 处理业务数据
                    if (ProtocolConstants.PROTOCOL_HEARTBEAT == gpsMsg.getProtocolNo()) {
                        // 回应心跳包消息处理
                        Process process = ProcessManager.getInstance().getProcess(EventEnum.HEARTBEAT);
                        process.execute(gpsMsg);
                    } else {
                        // 非登录、心跳 消息事件包，直接放入内存，后台线程异步处理
                        NettyTcpServer.processRunnable.pushUpMsg(gpsMsg);
                    }
                }
            } else {
                log.error("[TcpServerHandler] error Object in channelRead:{}", msg.toString());
            }

        } catch (Exception ex) {
            log.error("[TcpServerHandler] TcpServerHandler handler error.", ex);
            throw ex;
        } finally {
            // 如果是GpsMessage，不是ReferenceCounted，将不会release
            ReferenceCountUtil.release(msg);
        }
    }

    /**
     * channel 注册到一个 EventLoop.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("TcpServerHandler step1 Connected from {" +
                NetUtils.channelToString(ctx.channel().remoteAddress(), ctx.channel().localAddress()) + "}");
    }

    /**
     * channel已创建但未注册到一个 EventLoop.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.debug("TcpServerHandler step5 Disconnected from {" +
                NetUtils.channelToString(ctx.channel().remoteAddress(), ctx.channel().localAddress()) + "}");
        String deviceId = CommUtils.getDeviceIdFromAttr(ctx);
        if (StringUtils.isNotBlank(deviceId)) {
            log.info("remove a connetion:{}, from connectionmanager", deviceId);
            ConnectionManager.getInstance().removeConn(deviceId);
        }
        ctx.channel().attr(SysConstants.DEVICE_ID).remove();
    }

    /**
     * channel 变为活跃状态(连接到了远程主机)，现在可以接收和发送数据了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("TcpServerHandler step2 channelActive from (" + getRemoteAddress(ctx) + ")");
    }

    /**
     * channel 处于非活跃状态，没有连接到远程主机
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.debug("TcpServerHandler step3 channelInactive from (" + getRemoteAddress(ctx) + ")");
        String deviceId = CommUtils.getDeviceIdFromAttr(ctx);
        if (StringUtils.isNotBlank(deviceId)) {
            log.warn("TcpServerHandler step4 channelInactive, close channel deviceId -> " + deviceId + ", ctx -> " + ctx.toString());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warn("TcpServerHandler (" + getRemoteAddress(ctx) + ") -> Unexpected exception from downstream." + cause);
        String deviceId = CommUtils.getDeviceIdFromAttr(ctx);
        if (StringUtils.isNotBlank(deviceId)) {
            log.error("TcpServerHandler exceptionCaught (deviceId -> " + deviceId + ", ctx -> " + ctx.toString() + ") -> Unexpected exception from downstream." + cause);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                // 空闲时间过久，关闭连接
                log.info("空闲超过心跳时间，断开连接 deviceId:{}", CommUtils.getDeviceIdFromAttr(ctx));
                ctx.pipeline().close();
            }
        }
    }

    private String getRemoteAddress(ChannelHandlerContext ctx) {
        SocketAddress remote1 = ctx.channel().remoteAddress();
        InetSocketAddress remote = (InetSocketAddress) remote1;
        return NetUtils.toAddressString(remote);
    }

    private String getLocalAddress(ChannelHandlerContext ctx) {
        SocketAddress local1 = ctx.channel().localAddress();
        InetSocketAddress local = (InetSocketAddress) local1;
        return NetUtils.toAddressString(local);
    }

}
